-
Notifications
You must be signed in to change notification settings - Fork 282
fix: add explicit sort for window aggregates to fix correctness issues #3397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The core issue was that BoundedWindowAggExec requires InputOrderMode::Sorted but the input wasn't always properly sorted when ORDER BY was present. Changes: - Add explicit SortExec before BoundedWindowAggExec when ORDER BY is present - Change getSupportLevel from blanket Incompatible to Compatible for valid cases - Properly detect unsupported case: partition exprs must be subset of order exprs - Disable window by default (spark.comet.exec.window.enabled=false) to avoid breaking changes; users can opt-in to test the fix What now works natively (when enabled): - COUNT, SUM, MIN, MAX window aggregates - OVER() - no partition, no order - OVER(ORDER BY x) - order only - OVER(PARTITION BY x) - partition only - OVER(PARTITION BY x ORDER BY x, y) - partition is subset of order Tracking issue: apache#2721 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
3c6710a to
0787235
Compare
|
I need to regenerate the golden files |
| createExecEnabledConfig("explode", defaultValue = true) | ||
| val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] = | ||
| createExecEnabledConfig("window", defaultValue = true) | ||
| createExecEnabledConfig("window", defaultValue = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ouch I thought window was disabled
|
|
||
| // Ensure input is properly sorted when ORDER BY is present | ||
| // BoundedWindowAggExec requires InputOrderMode::Sorted | ||
| let needs_explicit_sort = !sort_exprs.is_empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs_explicit_sort confusing IMO.
can we just make a simple
if sort_exprs.is_empty() {plan } else { sort plan }
|
|
||
| override def getSupportLevel(op: WindowExec): SupportLevel = { | ||
| Incompatible(Some("Native WindowExec has known correctness issues")) | ||
| // DataFusion requires that partition expressions must be part of the sort ordering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if its true, need to check
Summary
This PR fixes core correctness issues with windowed aggregate queries by adding an explicit
SortExecbeforeBoundedWindowAggExecwhen ORDER BY is present.Tracking Issue: #2721
Changes
Add explicit SortExec (
planner.rs) - Insert sort beforeBoundedWindowAggExecwhen ORDER BY is present, ensuringInputOrderMode::Sortedrequirement is satisfiedImprove support level detection (
CometWindowExec.scala) - Change from blanketIncompatibletoCompatiblefor valid cases, with proper validation that partition expressions must be a subset of order expressionsDisable by default (
CometConf.scala) - Setspark.comet.exec.window.enabled=falseto avoid breaking changes; users can opt-in to testWhat's Now Supported (when enabled)
COUNT,SUM,MIN,MAXOVER()- no partition, no orderOVER(ORDER BY x)- order onlyOVER(PARTITION BY x)- partition onlyOVER(PARTITION BY x ORDER BY x, y)- partition is subset of orderWhat's NOT Supported (falls back to Spark)
PARTITION BY a ORDER BY bwhere partition columns differ from order columnsAVGwindow aggregate (native implementation has known issues)ROW_NUMBER,RANK,DENSE_RANK, etc.LAG,LEADFIRST_VALUE,LAST_VALUE,NTH_VALUERANGE BETWEENwith numeric/temporal expressions (Invalid argument error: Invalid arithmetic operation: Int32 - Int64 #1246)Test Plan
🤖 Generated with Claude Code